package raw.listener;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class ConsumerBalance {
    private static KafkaConsumer<String, String> consumer;

    /**
     * 存储一个主题多个分区的当前消费偏移量
     */
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    /**
     *  初始化消费者
     */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<>(configs);
        //主题订阅
        consumer.subscribe(Collections.singletonList("test1"), new RebalanceListener(consumer));
    }

    /**
     * 初始化配置
     */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "dhy-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("max.poll.records", 1000);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // 这里的参数指的是轮询的时间间隔，也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println("topic:" + record.topic()
                        + ",partition:" + record.partition()
                        + ",offset:" + record.offset()
                        + ",key:" + record.key()
                        + ",value" + record.value());
                // 每次消费记录消费偏移量，用于一旦发生rebalance时提交
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no matadata"));
            });
            consumer.commitAsync();
        }
    }

    static class RebalanceListener implements ConsumerRebalanceListener {
        KafkaConsumer<String, String> consumer;

        public RebalanceListener(KafkaConsumer<String,String> consumer) {
            this.consumer = consumer;
        }

        /**
         * 在rebalance发生之前和消费者停止读取消息之后被调用
         */
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            consumer.commitSync(currentOffsets);
        }

        /**
         * 在rebalance完成之后(重新分配了消费者对应的分区)，消费者开始读取消息之前被调用。
         */
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.commitSync(currentOffsets);
        }
    }
}